-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kvcoord: Implement CloseStream for MuxRangeFeed #108335
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 6 of 6 files at r1, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)
pkg/kv/kvpb/api.proto
line 2993 at r1 (raw file):
// When this bit is set, the server should attempt, as best effort, to // quickly terminate rangefeed for this stream. bool close_stream = 6;
It's unfortunate that we're overloading RangeFeedRequest
both for starting and closing a stream, it makes it harder to reason about how different fields interact. For example, when I set close_stream
, what happens if I don't set stream_id
, or pass a wrong span
? MuxRangeFeed should really take a separate oneof
message type, but I guess we're too far down this path now. :(
Let's at least document that only stream_id
matters when we set close_stream
. Also, I think we should guard against stream_id
and close_stream
being set in Node.RangeFeed()
and error out.
We should call out that callers must check the V23_2
version gate before using this. Otherwise, if a 23.2 node sends a close_stream
request to a 23.1 node, it will result in a new rangefeed being spun up.
pkg/server/node.go
line 224 at r1 (raw file):
} metaClosedMuxRangeFeedStreams = metric.Metadata{ Name: "rpc.streams.mux_rangefeed.closed_streams",
I'm not sure we need this? It's slightly ambiguous, in that a client can close streams both by closing the connection and explicitly closing each stream. I think I'd just avoid the confusion and not have this, unless you feel strongly about it.
pkg/server/node.go
line 1819 at r1 (raw file):
// nice to clean up anyway. activeStreams.Range(func(key, value any) bool { value.(*setRangeIDEventSink).cancel()
Let's do ctx, cancel := context.WithCancel(stream.Context()); defer cancel()
instead, and use ctx
throughout. That will cancel all the child contexts too.
pkg/server/node.go
line 1832 at r1 (raw file):
if req.CloseStream { if !n.storeCfg.Settings.Version.IsActive(stream.Context(), clusterversion.V23_2) { return errors.AssertionFailedf("unexpected CloseStream(%d) request (min version %s)",
The RPC server should never reject functionality based on version gates, the client gets to decide. Otherwise, version gate rollout races will cause spurious errors. See:
cockroach/pkg/clusterversion/cockroach_versions.go
Lines 54 to 57 in c6d599f
// At the same time, with requests/RPCs originating at other crdb nodes, the | |
// initiator of the request gets to decide what's supported. A node should | |
// not refuse functionality on the grounds that its view of the version gate | |
// is as yet inactive. Consider the sender: |
pkg/server/node.go
line 1846 at r1 (raw file):
// just before we receive close request. So, just print out a warning. if log.V(1) { log.Infof(stream.Context(), "ignoring likely benign CloseStream race for stream %d", req.StreamID)
Let's not speculate on why the client sent this, and just say "closing unknown rangefeed stream ID %d" or something.
pkg/server/node.go
line 1875 at r1 (raw file):
streamSink.cancel() if streamSink.closedByClient.Load() && errors.Is(err, context.Canceled) {
I think we can avoid closedByClient
here if we instead check streamSink.ctx.Err() != nil
. The client doesn't care about the stream any more anyway.
pkg/server/node.go
line 1880 at r1 (raw file):
// so that kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED gets returned // to the client. err = nil
Let's explicitly set err = kvpb.NewRangeFeedRetryError(REASON_RANGEFEED_CLOSED)
here. Otherwise, in the case of a version gate race, the code below can fall through to REASON_REPLICA_REMOVED
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making the effort to get this in!
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker)
pkg/kv/kvpb/api.proto
line 2993 at r1 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
It's unfortunate that we're overloading
RangeFeedRequest
both for starting and closing a stream, it makes it harder to reason about how different fields interact. For example, when I setclose_stream
, what happens if I don't setstream_id
, or pass a wrongspan
? MuxRangeFeed should really take a separateoneof
message type, but I guess we're too far down this path now. :(Let's at least document that only
stream_id
matters when we setclose_stream
. Also, I think we should guard againststream_id
andclose_stream
being set inNode.RangeFeed()
and error out.We should call out that callers must check the
V23_2
version gate before using this. Otherwise, if a 23.2 node sends aclose_stream
request to a 23.1 node, it will result in a new rangefeed being spun up.
Added NewCloseStreamRequest request helper method as a safety to make sure v23.2 gate is there (since we don't have real callers right now).
Added checks in regular rangefeed.
pkg/server/node.go
line 224 at r1 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
I'm not sure we need this? It's slightly ambiguous, in that a client can close streams both by closing the connection and explicitly closing each stream. I think I'd just avoid the confusion and not have this, unless you feel strongly about it.
Sure -- just wanted to have at least some observability into explicit close requests. But i'm okay dropping this for now.
pkg/server/node.go
line 1819 at r1 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
Let's do
ctx, cancel := context.WithCancel(stream.Context()); defer cancel()
instead, and usectx
throughout. That will cancel all the child contexts too.
Nice and neat. Thanks.
pkg/server/node.go
line 1832 at r1 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
The RPC server should never reject functionality based on version gates, the client gets to decide. Otherwise, version gate rollout races will cause spurious errors. See:
cockroach/pkg/clusterversion/cockroach_versions.go
Lines 54 to 57 in c6d599f
// At the same time, with requests/RPCs originating at other crdb nodes, the // initiator of the request gets to decide what's supported. A node should // not refuse functionality on the grounds that its view of the version gate // is as yet inactive. Consider the sender:
Cool; makes sense. To be safe (i.e. not to forget to use correct version gate when we need to use this RPC), I also added a utility method NewCloseStreamRequest
which errors out if v23.2 is not active.
pkg/server/node.go
line 1875 at r1 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
I think we can avoid
closedByClient
here if we instead checkstreamSink.ctx.Err() != nil
. The client doesn't care about the stream any more anyway.
I'm a bit worried relying just on ctx.Err() -- it can be non-nil when e.g. parent context is cancelled too; in that case I think it's correct to return an error.
What I can do is do "LoadAndDelete" above -- if context was cancelled, and !loaded -- then I think we know the stream was explicitly closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 6 of 6 files at r2, 2 of 2 files at r3, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)
pkg/kv/kvpb/api.proto
line 2993 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Added NewCloseStreamRequest request helper method as a safety to make sure v23.2 gate is there (since we don't have real callers right now).
Added checks in regular rangefeed.
May still want to add a comment here, since caller can just construct this directly if they want to.
pkg/server/node.go
line 1875 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I'm a bit worried relying just on ctx.Err() -- it can be non-nil when e.g. parent context is cancelled too; in that case I think it's correct to return an error.
What I can do is do "LoadAndDelete" above -- if context was cancelled, and !loaded -- then I think we know the stream was explicitly closed.
Ok, makes sense -- I don't have a good intuition for the error handling here. I'd still be inclined to check ctx.Err() != nil
here, since it's generally better form -- context.Canceled
can originate from a child context too, without our context being cancelled. It doesn't matter in this case though because streamClosedByClient
would be false. Your call.
pkg/server/node.go
line 1800 at r2 (raw file):
// cancelled once MuxRangeFeed exits. ctx, cancel := context.WithCancel(n.AnnotateCtx(stream.Context())) cancel()
defer cancel()
Also, still a few sites that use stream.Context()
, let's make sure we use ctx throughout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker)
pkg/kv/kvpb/api.proto
line 2993 at r1 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
May still want to add a comment here, since caller can just construct this directly if they want to.
Ack.
pkg/server/node.go
line 1875 at r1 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
Ok, makes sense -- I don't have a good intuition for the error handling here. I'd still be inclined to check
ctx.Err() != nil
here, since it's generally better form --context.Canceled
can originate from a child context too, without our context being cancelled. It doesn't matter in this case though becausestreamClosedByClient
would be false. Your call.
Okay; I'm good using streamCtx.Err != nil
1197210
to
ec2106c
Compare
Extend MuxRangeFeed protocol to support explicit, caller initiated CloseStream operation. The caller may decide to stop receiving events for a particular stream, which is part of MuxRangeFeed. The caller may issue a request to MuxRangeFeed server to close the stream. The server will cancel underlying range feed, and return a `RangeFeedRetryError_REASON_RANGEFEED_CLOSED` error as a response. Note, current mux rangefeed clinet does not use this request. The code to support cancellation is added pre-emptively in case this functionality will be required in the future to support restarts due to stuck rangefeeds. Epic: CRDB-26372 Release note: None
bors r+ |
Build succeeded: |
Extend MuxRangeFeed protocol to support explicit,
caller initiated CloseStream operation.
The caller may decide to stop receiving events
for a particular stream, which is part of MuxRangeFeed. The caller may issue a request to MuxRangeFeed server to close the stream. The server will cancel underlying range feed, and return a
RangeFeedRetryError_REASON_RANGEFEED_CLOSED
error as a response.Note, current mux rangefeed clinet does not use this request. The code to support cancellation is added pre-emptively in case this functionality will be required in the future to support restarts due to stuck rangefeeds.
Epic: CRDB-26372
Release note: None